{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Building Elasticsearch APIs with FastAPI\n",
    "\n",
    "This notebook shows how to build an Elasticsearch API with FastAPI with best practices. This notebook is based on the article [Building Elasticsearch APIs with FastAPI](https://www.elastic.co/search-labs/blog/building-elasticsearch-apis-with-fastapi)\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Installing dependencies and importing packages"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%pip install fastapi uvicorn elasticsearch pydantic -q"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "import asyncio\n",
    "import json\n",
    "import os\n",
    "import nest_asyncio\n",
    "import uvicorn\n",
    "\n",
    "from getpass import getpass\n",
    "from typing import List\n",
    "from elasticsearch import Elasticsearch, helpers\n",
    "from fastapi import BackgroundTasks, Body, FastAPI, HTTPException, Response\n",
    "from pydantic import BaseModel"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Declaring variables"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "os.environ[\"ELASTICSEARCH_ENDPOINT\"] = getpass(\"Elasticsearch endpoint: \")\n",
    "os.environ[\"ELASTICSEARCH_API_KEY\"] = getpass(\"Elasticsearch api-key: \")\n",
    "\n",
    "nest_asyncio.apply()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Instance a Elasticsearch client"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "ES_INDEX = \"vet-visits\"\n",
    "\n",
    "es_client = Elasticsearch(\n",
    "    hosts=[os.environ[\"ELASTICSEARCH_ENDPOINT\"]],\n",
    "    api_key=os.environ[\"ELASTICSEARCH_API_KEY\"],\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Indexing data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    es_client.indices.create(\n",
    "        index=ES_INDEX,\n",
    "        body={\n",
    "            \"mappings\": {\n",
    "                \"properties\": {\n",
    "                    \"breed\": {\"type\": \"keyword\"},\n",
    "                    \"owner_name\": {\n",
    "                        \"type\": \"text\",\n",
    "                        \"fields\": {\"keyword\": {\"type\": \"keyword\"}},\n",
    "                    },\n",
    "                    \"pet_name\": {\n",
    "                        \"type\": \"text\",\n",
    "                        \"fields\": {\"keyword\": {\"type\": \"keyword\"}},\n",
    "                    },\n",
    "                    \"species\": {\"type\": \"keyword\"},\n",
    "                    \"vaccination_history\": {\"type\": \"keyword\"},\n",
    "                    \"visit_details\": {\"type\": \"text\"},\n",
    "                }\n",
    "            }\n",
    "        },\n",
    "    )\n",
    "\n",
    "    print(f\"Index '{ES_INDEX}' created.\")\n",
    "except Exception as e:\n",
    "    print(f\"Error al crear el índice '{ES_INDEX}': {e}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def build_data(ndjson_file, index_name):\n",
    "    with open(ndjson_file, \"r\") as f:\n",
    "        for line in f:\n",
    "            doc = json.loads(line)\n",
    "            yield {\"_index\": index_name, \"_source\": doc}\n",
    "\n",
    "\n",
    "try:\n",
    "    success, errors = helpers.bulk(es_client, build_data(\"vet-visits.ndjson\", ES_INDEX))\n",
    "    print(f\"{success} documents indexed successfully\")\n",
    "\n",
    "    if errors:\n",
    "        print(\"Errors during indexing:\", errors)\n",
    "except Exception as e:\n",
    "    print(f\"Error: {str(e)}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## FastAPI setup"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "app = FastAPI()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Pydantic models for the request and response"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Pydantic model for the request\n",
    "class SearchRequest(BaseModel):\n",
    "    term: str\n",
    "    size: int = 10\n",
    "\n",
    "\n",
    "# Format for hits\n",
    "class SearchHit(BaseModel):\n",
    "    owner_name: str = \"\"\n",
    "    visit_details: str = \"\"\n",
    "\n",
    "\n",
    "# Pydantic model for the response\n",
    "class SearchResponse(BaseModel):\n",
    "    hits: List[SearchHit]\n",
    "    total: int"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Endpoints"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "@app.get(\"/ping\")\n",
    "async def ping():\n",
    "    try:\n",
    "        health = await es_client.cluster.health()\n",
    "\n",
    "        return {\n",
    "            \"status\": \"success\",\n",
    "            \"message\": \"Connected to Elasticsearch\",\n",
    "            \"cluster_status\": health[\"status\"],\n",
    "            \"number_of_nodes\": health[\"number_of_nodes\"],\n",
    "            \"active_shards\": health[\"active_shards\"],\n",
    "        }\n",
    "    except Exception as e:\n",
    "        status_code = getattr(e, \"status_code\", 500)\n",
    "\n",
    "        raise HTTPException(\n",
    "            status_code=status_code,\n",
    "            detail=f\"Error connecting to Elasticsearch: {str(e)}\",\n",
    "        )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Search endpoint without Pydantic models"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "@app.post(\"/search\")\n",
    "async def search(query: dict = Body(...)):\n",
    "    try:\n",
    "        result = await es_client.search(index=ES_INDEX, body=query)\n",
    "\n",
    "        return result\n",
    "    except Exception as e:\n",
    "        status_code = getattr(e, \"status_code\", 500)\n",
    "\n",
    "        raise HTTPException(status_code=status_code, detail=str(e))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Search endpoint with Pydantic models"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Using Pydantic models for request/response validation\n",
    "@app.post(\"/search\", response_model=SearchResponse)\n",
    "async def search_v3(request: SearchRequest):\n",
    "    try:\n",
    "        query = {\n",
    "            \"query\": {\"match_phrase\": {\"visit_details\": request.term}},\n",
    "            \"size\": request.size,\n",
    "        }\n",
    "\n",
    "        result = await es_client.search(index=ES_INDEX, body=query)\n",
    "        hits = result[\"hits\"][\"hits\"]\n",
    "        results = []\n",
    "\n",
    "        for hit in hits:\n",
    "            source = hit.get(\"_source\", {})\n",
    "            results.append(\n",
    "                SearchHit(\n",
    "                    owner_name=source[\"owner_name\"],\n",
    "                    visit_details=source[\"visit_details\"],\n",
    "                )\n",
    "            )\n",
    "\n",
    "        return SearchResponse(hits=results, total=len(results))\n",
    "    except Exception as e:\n",
    "        status_code = getattr(e, \"status_code\", 500)\n",
    "\n",
    "        raise HTTPException(status_code=status_code, detail=str(e))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [],
   "source": [
    "async def check_task(es_client, task_id):\n",
    "    try:\n",
    "        while True:\n",
    "            status = await es_client.tasks.get(task_id=task_id)\n",
    "            if status.get(\"completed\", False):\n",
    "                print(f\"Task {task_id} completed.\")\n",
    "                # Here should be the logic to send the email\n",
    "                break\n",
    "\n",
    "            await asyncio.sleep(2)\n",
    "    except Exception as e:\n",
    "        print(f\"Error checking task {task_id}: {e}\")\n",
    "\n",
    "\n",
    "# Background task endpoint\n",
    "@app.post(\"/delete-by-query\")\n",
    "async def delete_by_query(\n",
    "    request: SearchRequest = Body(...), background_tasks: BackgroundTasks = None\n",
    "):\n",
    "    try:\n",
    "        body = {\"query\": {\"term\": {\"pet_name.keyword\": request.term}}}\n",
    "\n",
    "        response = await es_client.delete_by_query(\n",
    "            index=ES_INDEX, body=body, wait_for_completion=False\n",
    "        )\n",
    "\n",
    "        task_id = response.get(\"task\")\n",
    "        if task_id:\n",
    "            background_tasks.add_task(check_task, es_client, task_id)\n",
    "\n",
    "        return Response(\n",
    "            status_code=200,\n",
    "            content=json.dumps(\n",
    "                {\n",
    "                    \"message\": \"Delete by query. The response will be send by email when the task is completed.\",\n",
    "                    \"task_id\": task_id,\n",
    "                }\n",
    "            ),\n",
    "            media_type=\"application/json\",\n",
    "        )\n",
    "    except Exception as e:\n",
    "        status_code = getattr(e, \"status_code\", 500)\n",
    "\n",
    "        raise HTTPException(status_code=status_code, detail=str(e))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if __name__ == \"__main__\":\n",
    "    uvicorn.run(app, host=\"0.0.0.0\", port=8000)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Clean up"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "es_client.indices.delete(index=ES_INDEX, ignore=[400, 404])"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
